Java分布式环境下并发编程实践

您所在的位置:网站首页 并发和分布式程序设计原理 1990 Java分布式环境下并发编程实践

Java分布式环境下并发编程实践

2024-03-18 18:22| 来源: 网络整理| 查看: 265

欢迎来到笑小枫的世界,喜欢的朋友关注一下我呦,大伙的支持,就是我坚持写下去的动力。

欢迎关注我的微信公众号:【笑小枫】

也欢迎各位大佬互动,一起学习,共同进步

微信公众号【笑小枫】专属目录 一、线程安全二、锁三、死锁四、分布式系统下的并发编程挑战五、分布式锁的实现1. Redis实现分布式锁的原理2. Redis实现分布式锁的代码实现3. Redisson实现分布式锁的代码实现 六、分布式事务的实现1. XA协议的原理2. XA协议的代码实现 七、总结

在Java中,我们可以使用线程来实现并发编程,但是在多线程编程中,我们需要考虑线程安全、锁、死锁等问题。本文将介绍Java中的并发编程,包括线程安全、锁、死锁等内容,同时提供实际的代码案例,让读者更容易理解和掌握。

随着分布式系统越来越普及,分布式系统中的并发编程成为了一个重要的话题。Java作为一种高级编程语言,其并发编程能力得到了广泛的认可。但在分布式系统中,Java并发编程面临着一些新的挑战。本文将介绍在分布式系统下Java并发编程的一些技术和实际案例。

image-20230409205028209

一、线程安全

在多线程编程中,线程安全是一个重要的问题。如果多个线程同时访问同一个共享资源,就会出现线程安全问题。例如,在银行账户转账时,如果多个线程同时对同一个账户进行操作,就会出现线程安全问题。

解决线程安全问题的方法之一是使用synchronized关键字。synchronized关键字可以将代码块或方法锁定,保证同一时间只有一个线程可以执行该代码块或方法。

下面是一个使用synchronized关键字的示例:

public class Counter { private int count; public synchronized void increment() { count++; } public synchronized void decrement() { count--; } public int getCount() { return count; } }

在这个示例中,Counter类有两个方法increment()和decrement(),它们都是使用synchronized关键字来保证线程安全。这样,同一时间只有一个线程可以执行increment()和decrement()方法。

二、锁

在Java中,锁是一种同步机制,可以用于控制多个线程对共享资源的访问。Java中的锁有两种类型:内置锁和显式锁。

内置锁是Java中的一个特殊对象,每个对象都有一个内置锁。可以使用synchronized关键字来获取内置锁。例如:

public synchronized void increment() { count++; }

在这个示例中,synchronized关键字获取了Counter对象的内置锁。这样,在同一时间只有一个线程可以访问increment()方法。

显式锁是Java中的另一种锁类型,可以使用java.util.concurrent.locks包中的Lock接口来实现。与内置锁不同,显式锁提供了更多的灵活性和控制。例如:

public class Counter { private int count; private Lock lock = new ReentrantLock(); public void increment() { lock.lock(); try { count++; } finally { lock.unlock(); } } public void decrement() { lock.lock(); try { count--; } finally { lock.unlock(); } } public int getCount() { return count; } }

在这个示例中,Counter类使用ReentrantLock类来创建一个显式锁。increment()和decrement()方法获取锁并释放锁。这样,在同一时间只有一个线程可以访问increment()和decrement()方法。

三、死锁 image-20230409205211771

死锁是多线程编程中的一种问题,它发生在两个或多个线程互相等待对方释放锁的情况下。例如:

public class DeadlockExample { private final Object lock1 = new Object(); private final Object lock2 = new Object(); public void method1() { synchronized (lock1) { // do something synchronized (lock2) { // do something } } } public void method2() { synchronized (lock2) { // do something synchronized (lock1) { // do something } } } }

在这个示例中,DeadlockExample类有两个方法method1()和method2(),它们都使用两个锁lock1和lock2。如果一个线程调用method1()方法并获取了lock1锁,另一个线程调用method2()方法并获取了lock2锁,那么两个线程都无法继续执行,因为它们都在等待对方释放锁。这就是死锁。

避免死锁的方法之一是使用定时锁。定时锁可以在一定时间内自动释放锁,避免死锁。例如:

public class DeadlockExample { private final Object lock1 = new Object(); private final Object lock2 = new Object(); private final Lock timedLock1 = new ReentrantLock(); private final Lock timedLock2 = new ReentrantLock(); public void method1() { timedLock1.lock(); try { // do something if (timedLock2.tryLock(500, TimeUnit.MILLISECONDS)) { try { // do something } finally { timedLock2.unlock(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { timedLock1.unlock(); } } public void method2() { timedLock2.lock(); try { // do something if (timedLock1.tryLock(500, TimeUnit.MILLISECONDS)) { try { // do something } finally { timedLock1.unlock(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { timedLock2.unlock(); } } }

在这个示例中,DeadlockExample类使用定时锁timedLock1和timedLock2来避免死锁。如果一个线程调用method1()方法并获取了timedLock1锁,另一个线程调用method2()方法并获取了timedLock2锁,那么它们会等待一段时间,如果在这段时间内无法获取到对方的锁,就会自动释放自己的锁,避免死锁。

四、分布式系统下的并发编程挑战 image-20230409205403525

在分布式系统中,由于不同的节点之间通过网络进行通信,因此会带来以下一些挑战:

网络延迟

在分布式系统中,由于节点之间通过网络进行通信,因此会存在网络延迟。这会导致节点之间的通信变慢,从而影响并发编程的效率。为了解决这个问题,可以采用异步编程模型,即通过回调函数的方式来处理网络通信。

数据一致性

在分布式系统中,由于数据分布在不同的节点上,因此会存在数据一致性的问题。如果不同节点上的数据不一致,就会导致系统出现异常。为了解决这个问题,可以采用分布式锁或者分布式事务来保证数据一致性。

容错性

在分布式系统中,由于节点之间存在网络通信,因此会存在节点宕机的情况。为了保证系统的容错性,需要采用一些容错机制,例如备份节点、自动故障转移等。

五、分布式锁的实现

在分布式系统中,为了保证数据一致性,需要采用分布式锁来控制对共享资源的访问。下面介绍一种基于Redis实现的分布式锁。

image-20230409205625391

笑小枫友情链接,感兴趣的进 更详细的分布式锁可以参考文章:分布式锁的 3 种实现方案

1. Redis实现分布式锁的原理

Redis是一个高性能的键值存储系统,支持多种数据结构,例如字符串、哈希表、列表等。Redis提供了一种原子性的操作,可以实现分布式锁。

实现分布式锁的原理如下:

1)客户端向Redis发送一个SETNX命令,尝试去设置一个key的值为1,如果这个key不存在,则设置成功,否则设置失败。

2)客户端设置了这个key的值为1之后,就拥有了这个锁。

3)其他客户端也可以向Redis发送SETNX命令,尝试去设置这个key的值为1,但是由于这个key已经存在了,因此设置失败。

4)当客户端完成了对共享资源的访问之后,需要将这个key删除,以便其他客户端可以获得这个锁。

2. Redis实现分布式锁的代码实现

下面是基于Redis实现分布式锁的代码实现:

public class RedisDistributedLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private static final String LOCK_PREFIX = "lock:"; private JedisPool jedisPool; public RedisDistributedLock(JedisPool jedisPool) { this.jedisPool = jedisPool; } public boolean tryLock(String key, String requestId, int expireTime) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String lockKey = LOCK_PREFIX + key; String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } } finally { if (jedis != null) { jedis.close(); } } return false; } public boolean releaseLock(String key, String requestId) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String lockKey = LOCK_PREFIX + key; String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } } finally { if (jedis != null) { jedis.close(); } } return false; } } 3. Redisson实现分布式锁的代码实现

当然,这里介绍的是比较原生的方式,我们也可以直接使用Redisson框架封装的分布式锁。

Redisson是一个基于Redis的Java客户端,提供了丰富的分布式数据结构和服务。其中就包括分布式锁的实现,下面介绍一下如何使用Redisson实现分布式锁。

引入Redisson依赖 org.redisson redisson 3.15.5 使用分布式锁

好的,下面提供一个更详细的代码示例:

import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import java.util.concurrent.TimeUnit; public class DistributedLockDemo { public static void main(String[] args) { // 创建Redisson客户端 Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redisson = Redisson.create(config); // 获取分布式锁 RLock lock = redisson.getLock("mylock"); try { // 尝试获取锁,等待时间为10秒,锁的有效期为5秒 boolean isLocked = lock.tryLock(10, 5, TimeUnit.SECONDS); if (isLocked) { // 获取锁成功,执行业务逻辑 System.out.println("获取锁成功,执行业务逻辑..."); Thread.sleep(3000); // 模拟业务逻辑耗时 } else { // 获取锁失败,处理异常情况 System.out.println("获取锁失败,处理异常情况..."); } } catch (Exception e) { // 处理异常情况 System.out.println("处理异常情况..."); } finally { // 释放锁 lock.unlock(); System.out.println("释放锁..."); } // 关闭Redisson客户端 redisson.shutdown(); } }

在上面的代码中,我们首先创建了一个RedissonClient对象,然后通过该对象获取一个RLock对象。在try...catch...finally代码块中,我们调用tryLock方法尝试获取锁,如果获取成功就执行业务逻辑;否则就处理异常情况。最后,在finally代码块中释放锁,并关闭RedissonClient对象。

需要注意的是,在实际应用中,我们需要将上面的代码封装成一个可重入的分布式锁工具类,方便各个业务模块使用。

六、分布式事务的实现

在分布式系统中,为了保证数据一致性,需要采用分布式事务来控制对共享资源的访问。下面介绍一种基于XA协议实现的分布式事务。

image-20230409205723358

笑小枫友情链接,感兴趣的进 具体事务相关可以参考文章:七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)

1. XA协议的原理

XA协议是一种分布式事务协议,可以用于协调多个数据库的事务。XA协议的原理如下:

1)事务管理器向数据库发送XA START命令,开始一个分布式事务。

2)事务管理器向数据库发送XA END命令,结束一个分布式事务。

3)事务管理器向数据库发送XA PREPARE命令,准备提交一个分布式事务。

4)如果所有数据库都准备好提交事务,则事务管理器向数据库发送XA COMMIT命令,提交分布式事务。

5)如果有任何一个数据库无法提交事务,则事务管理器向所有数据库发送XA ROLLBACK命令,回滚分布式事务。

2. XA协议的代码实现

下面是基于XA协议实现分布式事务的代码实现:

public class XADistributedTransaction { private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; private static final String DB_URL = "jdbc:mysql://localhost:3306/test"; private static final String USER = "root"; private static final String PASSWORD = "root"; private static final String XA_DATASOURCE_CLASSNAME = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"; private static final String XA_DATASOURCE_URL = "jdbc:mysql://localhost:3306/test"; private static final String XA_DATASOURCE_USER = "root"; private static final String XA_DATASOURCE_PASSWORD = "root"; private static final String XID_PREFIX = "xa_"; private static final String TABLE_NAME = "account"; private static final String TABLE_SCHEMA = "CREATE TABLE account (id INT PRIMARY KEY, balance INT)"; private static final String INSERT_SQL = "INSERT INTO account (id, balance) VALUES (?, ?)"; private static final String UPDATE_SQL = "UPDATE account SET balance = ? WHERE id = ?"; public void transferMoney(int fromId, int toId, int amount) throws SQLException { XADataSource xaDataSource = getXADataSource(); Connection connection = xaDataSource.getXAConnection().getConnection(); Xid xid = createXid(); try { connection.setAutoCommit(false); XAResource xaResource = getXAResource(connection); xaResource.start(xid, XAResource.TMNOFLAGS); try (PreparedStatement preparedStatement = connection.prepareStatement(UPDATE_SQL)) { preparedStatement.setInt(1, getBalance(connection, fromId) - amount); preparedStatement.setInt(2, fromId); preparedStatement.executeUpdate(); } xaResource.end(xid, XAResource.TMSUCCESS); xaResource.start(xid, XAResource.TMNOFLAGS); try (PreparedStatement preparedStatement = connection.prepareStatement(UPDATE_SQL)) { preparedStatement.setInt(1, getBalance(connection, toId) + amount); preparedStatement.setInt(2, toId); preparedStatement.executeUpdate(); } xaResource.end(xid, XAResource.TMSUCCESS); int prepare = xaResource.prepare(xid); if (prepare == XAResource.XA_OK) { xaResource.commit(xid, false); } else { xaResource.rollback(xid); } connection.commit(); } catch (SQLException | XAException e) { connection.rollback(); throw e; } finally { connection.close(); } } private XADataSource getXADataSource() throws SQLException { MysqlXADataSource xaDataSource = new MysqlXADataSource(); xaDataSource.setUrl(XA_DATASOURCE_URL); xaDataSource.setUser(XA_DATASOURCE_USER); xaDataSource.setPassword(XA_DATASOURCE_PASSWORD); return xaDataSource; } private Xid createXid() throws XAException { byte[] gtrid = new byte[10]; byte[] bqual = new byte[10]; Arrays.fill(gtrid, (byte) 9); Arrays.fill(bqual, (byte) 9); return new XidImpl(0x1234, gtrid, bqual); } private XAResource getXAResource(Connection connection) throws SQLException { return connection.unwrap(XAResource.class); } private int getBalance(Connection connection, int id) throws SQLException { try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT balance FROM account WHERE id = ?")) { preparedStatement.setInt(1, id); try (ResultSet resultSet = preparedStatement.executeQuery()) { if (resultSet.next()) { return resultSet.getInt("balance"); } } } throw new RuntimeException("Account not found: " + id); } public void init() throws SQLException { try (Connection connection = DriverManager.getConnection(DB_URL, USER, PASSWORD)) { try (Statement statement = connection.createStatement()) { statement.executeUpdate("DROP TABLE IF EXISTS account"); statement.executeUpdate(TABLE_SCHEMA); try (PreparedStatement preparedStatement = connection.prepareStatement(INSERT_SQL)) { preparedStatement.setInt(1, 1); preparedStatement.setInt(2, 1000); preparedStatement.executeUpdate(); } try (PreparedStatement preparedStatement = connection.prepareStatement(INSERT_SQL)) { preparedStatement.setInt(1, 2); preparedStatement.setInt(2, 1000); preparedStatement.executeUpdate(); } } } } } 七、总结

本文介绍了Java中的并发编程,包括线程安全、锁、死锁等内容。在多线程编程中,线程安全是一个重要的问题,可以使用synchronized关键字或显式锁来实现。死锁是一个常见的问题,可以使用定时锁来避免。多线程编程需要仔细考虑线程安全和锁的问题,才能保证程序的正确性和性能。

并且讲解了在分布式系统下Java并发编程的一些技术和实际案例。在分布式系统中,Java并发编程需要面对网络延迟、数据一致性和容错性等挑战,需要采用一些技术和机制来解决这些问题。例如,可以采用基于Redis实现的分布式锁来控制对共享资源的访问,也可以采用基于XA协议实现的分布式事务来保证数据一致性。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3